/**
 * Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long)
 * 
 * @author cristina
 */

package org.PP;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class FailureCause {

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, LongWritable> {

		// Usage: a value that is greater than the previous key in a map but not
		// greater than the current key is classified as the current value
		public static HashMap<Integer, String> failureCause = new HashMap<Integer, String>();
		public static HashMap<Integer, String> durationGroup = new HashMap<Integer, String>();

		public static void init() {
			// failure cause
			failureCause.put(new Integer(-1), "not reported");
			failureCause.put(new Integer(0), "reported as undetermined");
			failureCause.put(new Integer(999), "infrastructure");
			failureCause.put(new Integer(1999), "hardware");
			failureCause.put(new Integer(2999), "IO");
			failureCause.put(new Integer(3999), "network");
			failureCause.put(new Integer(4999), "software");
			failureCause.put(new Integer(5999), "human error");
			failureCause.put(new Integer(6999), "user");
			failureCause.put(new Integer(7000), "end of measurement");

			// group by duration
			durationGroup.put(new Integer(1000), "short");
			durationGroup.put(new Integer(100000), "medium");
			durationGroup.put(Integer.MAX_VALUE, "long");
		}

		public void map(LongWritable key, Text value,
				OutputCollector<Text, LongWritable> output, Reporter reporter)
				throws IOException {

			Map.init();
			
			Text label = new Text();
			String line = value.toString();

			// skip comments
			if (!line.startsWith("#")) {
				Double start = new Double(0), end = new Double(0), duration = new Double(
						0);
				Integer fault = new Integer(0);
				int count = 0;

				// collons 7, 8 and 9 are start, end time of job, failure code
				// respectively
				StringTokenizer tokenizer = new StringTokenizer(line);
				while (tokenizer.hasMoreTokens()) {
					count++;
					String token = tokenizer.nextToken();
					if (count == 7)
						start = Double.parseDouble(token);
					if (count == 8)
						end = Double.parseDouble(token);
					if (count == 9) {
						if (token.compareTo("NULL") == 0) {
							fault = -1;
						} else
							fault = Integer.parseInt(token);
					}
				}

				String durationClass = "", faultClass = "";

				// get duration class
				duration = end - start;
				LinkedList<Integer> ll1 = new LinkedList<Integer>(
						Map.durationGroup.keySet());
				Collections.sort(ll1);
				Iterator iterator1 = ll1.iterator();
				while (iterator1.hasNext()) {
					int mkey = (Integer) iterator1.next();
					if (duration <= mkey) {
						String mvalue = (String) Map.durationGroup.get(mkey);
						durationClass = mvalue;
						break;
					}
				}

				// get fault class
				LinkedList<Integer> ll2 = new LinkedList<Integer>(
						Map.failureCause.keySet());
				Collections.sort(ll2);
				Iterator iterator2 = ll2.iterator();
				while (iterator2.hasNext()) {
					int mkey = (Integer) iterator2.next();
					if (fault <= mkey) {
						String mvalue = (String) Map.failureCause.get(mkey);
						faultClass = mvalue;
						break;
					}
				}

				if(faultClass.compareTo("") == 0)
					faultClass = "TYPING";
				
				label = new Text(durationClass + " event - " + faultClass
						+ " fault");

				// map
				output.collect(label, new LongWritable(1));
			}
		}
	}

	public static class Combine extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
	
	public void reduce(Text key, Iterator<LongWritable> values,
			OutputCollector<Text, LongWritable> output, Reporter reporter)
			throws IOException {
	
		Long sum = new Long(0);
	
		// sum them
		while (values.hasNext()) {
			sum += values.next().get();
		}
	
		output.collect(key, new LongWritable(sum));
	}
}
	
	public static class Reduce extends MapReduceBase implements
			Reducer<Text, LongWritable, Text, LongWritable> {

		public void reduce(Text key, Iterator<LongWritable> values,
				OutputCollector<Text, LongWritable> output, Reporter reporter)
				throws IOException {

			Long sum = new Long(0);

			// sum them
			while (values.hasNext()) {
				sum += values.next().get();
			}

			if(sum > 1000)
				output.collect(key, new LongWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
		
		JobConf conf = new JobConf(FailureCause.class);
		conf.setJobName("failureCause");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(LongWritable.class);

		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Combine.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		conf.setNumReduceTasks(5);

		JobClient.runJob(conf);
	}
}
